MQTT : MQTT subscribe and publish protocol client
This module is an MQTT client that can be used to interact with the Iot cloud service for data interaction. This module supports both tcp
and tls
securely connections.
MQTT is a Machine-to-Machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For example, it has been used in sensors communicating to a broker via satellite link, over occasional dial-up connections with healthcare providers, and in a range of home automation and small device scenarios. It is also ideal for mobile applications because of its small size, low power usage, minimised data packets, and efficient distribution of information to one or many receivers.
MQTT does not restrict the subject matter of the transport and the type of the message content, but for compatibility reasons, JSRE recommends using the JSON
data format for publishing and subscribing.
User can use the following code to import the mqtt
module.
var mqtt = require('mqtt');
Support
The following shows mqtt
module APIs available for each permissions.
User Mode | Privilege Mode | |
---|---|---|
mqtt.open | ● | ● |
mqtt.mode | ● | ● |
client.close | ● | ● |
client.connect | ● | ● |
client.isConnected | ● | ● |
client.isQueued | ● | ● |
client.publish | ● | ● |
client.subscribe | ● | ● |
client.unsubscribe | ● | ● |
Mqtt Object
mqtt.open(saddr[, tlsOpt[, timeout]])
saddr
{Object} Server socket address.tlsOpt
{Object} TLS securely connections options. default: undefined, means use TCP connection.timeout
{Integer} Synchronous connection time to wait in milliseconds, default: undefined means timeout with default connect timeout setting.- Returns: {Object} A MQTT Client object.
saddr
Please consult the socket
module documentation.tlsOpt
Please consult the tls
module documentation.
Create an MQTT client and connect to the specified server. Use synchronous mode.
Example
- TCP
var serAddr = socket.sockaddr('192.168.0.1', 8010);
var client = mqtt.open(serAddr, undefined, 5000);
if (!client) {
console.log('Can not connect to broker.');
}
- TLS
var serAddr = socket.sockaddr('192.168.0.1', 8020);
var tlsOpt = {
rejectUnauthorized: true,
ca: fs.readString('./ca.pem'),
server: 'xxx.xxx.com'
};
var client = mqtt.open(serAddr, tlsOpt, 5000);
if (!client) {
console.log('Can not connect to broker.');
}
mqtt.open(saddr, tlsOpt, callback[, bufSize])
saddr
{Object} Server socket address.tlsOpt
{Object} TLS securely connections options.undefined
means TCP connection.callback
{Function} Connected callback function.client
{Object} Client object.remote
{Object} Remote address.
bufSize
{Integer} Buffer size (512 ~ 4096). default: 2048.- Returns: {Object} A MQTT Client object.
Example
var serIp = dns.lookup('mqtt://mybroker.com');
var serAddr = socket.sockaddr(serIp, 8010);
var client = null;
mqtt.open(serAddr, undefined, (client, remote) => {
if (remote) {
// TCP is connected!
} else {
// TCP connect error!
}
});
mqtt.mode()
- Returns: {String} Operating mode.
Get the current process MQTT working mode:
'off'
MQTT is not enabled.'listener'
MQTT listen only mode.'publisher'
MQTT can subscribe and publish.
Client Object
client.close()
Close the connection with server.
client.connect(clientOpt[, callback])
clientOpt
{Object} MQTT connect parameters.callback
{Function} Connected callback.client
{Object} Client object.
clientOpt
MQTT connect parameters can includes following members:
client
{Buffer} | {String} The broker identifies each client by its client id.user
{String} Optional. User name when connecting to a broker.passwd
{String} Optional. User password authentication when connecting to a broker.keepalive
{Integer} Keepalive time in seconds. If no data is sent on the connection in the given time window the broker disconnects the client.will
{Boolean} Optional. If this flag is set to true, amessage
and atopic
must follow with a QoS value between0
and2
.qos
{Integer} Ifwill
is set to true, themessage
will be sent with the given QoS.topic
{String} Only processed whenwill
is set to true. The topic of themessage
should be sent to.message
{String} | {Buffer} Only processed whenwill
is set to true. Themessage
to be sent to the broker when connection broken.
Connect to the MQTT service with the specified parameters.
Example
/* MQTT Client Options */
var options = { client: 'mybroker', user: 'admin', passwd: 'password', will: false, topic: '', message: '', keepalive: 60, qos: 0 };
client.connect(options, (client) => {
// MQTT Connected!
client.publish(...);
});
client.isConnected()
- Returns: {Boolean} Current MQTT connection status. true is connected, otherwise false.
Get current connection status.
client.isQueued()
- Returns: {Boolean} Whether the send queue contains messages that are not acknowledged by the server.
Get whether the send queue contains messages that are not acknowledged by the server.
client.publish(topic, message[, options[, callback]])
topic
{String} Message topic.message
{Buffer} | {String} Message content.options
{Object} Publish options. default: qos is 0, retain is false.callback
{Function} Published callback function.error
{Error} Identifies the sending error information, if it isundefined
, it means success.
options
MQTT publish parameters can includes following members:
qos
{Integer} Optional. default: 0.retain
{Boolean} Optional. Ifretain
is true the broker stores the message for clients subscribing with retain true flag, therefore they can receive it later. default: false.
Publish a message with the specified options.
Example
client.publish('topic1', 'message1');
client.publish('topic2', 'message2', { qos: 1 });
client.publish('topic3', 'message3', { qos: 1 }, (error) => {
if (error) {
console.log('Publish error:', error);
}
});
client.subscribe(topic[, options[, callback]])
topic
{String} Subscribe message topic.options
{Object} Publish options. default: qos is 0, retain is false.callback
{Function} Subscribe callback function.error
{Error} Identifies the subscribe error information, if it isundefined
, it means success.
options
MQTT subscribe parameters can includes following members:
qos
{Integer} Optional. default: 0.retain
{Boolean} Optional. Ifretain
is true the client receives the messages that were sent to the desiredtopic
before it connected. Defaults to false. default: false.
The client subscribes to a given topic
. If there are messages available on the topic
the client emits a data
event with the message received from the broker.
Example
client.subscribe('topic1', { qos: 1 });
client.unsubscribe(topic[, callback])
topic
{String} Unsubscribe message topic.callback
{Function} Unsubscribe callback function.error
{Error} Identifies the subscribe error information, if it isundefined
, it means success.
Unsubscribes the client from a given topic
. If QoS was turned on on the subscription the remaining packets will be sent by the server.
Example
client.unsubscribe('topic1');
Events
The client
object inherits from the EventEmitter
class. The following events are thrown in some specific situations.
connect
Emitted when the client successfully connects to a MQTT broker.
disconnect
A disconnect
event is emitted when network link is broken, and mqtt object will be call mqtt.close()
automatically.
close
A close
event is emitted when the broker disconnects the client gracefully.
error
If an error occured and no callback function called, an error
event is emitted with the error information. If this event has no listeners, a Task.uncaughtException()
exception is thrown.
message
When data is received from the server a message
event is emitted with a data
object. It has the following properties:
topic
{String} The topic the message was sent from.message
{Buffer} The message the broker sent.qos
{Integer} The QoS level the message was sent with.packetId
{Integer} The id of the packet if QoS was enabled.
Example
Here is a typical example of MQTT Client.
var iosched = require('iosched');
var socket = require('socket');
var mqtt = require('mqtt');
// Server
var server = socket.sockaddr('192.168.0.2', 61613);
// Seq-number
var seqNumer = 0;
// MQTT Client
var client = null;
// MQTT Client Options.
var options = { client: 'mybroker', user: 'admin', passwd: 'password', keepalive: 60 };
// MQTT open
function clientOpen() {
client = mqtt.open(server, null, 5000);
if (client === undefined) {
console.log(`Can not open ${server.addr} MQTT server!`);
return false;
} else {
console.log(`${server.addr} MQTT server opened!`);
client.on('message', function(data) {
console.log(`recevied a message from mqtt, topic: ${data.topic}, message: ${data.message}`);
});
client.on('disconnect', function() {
console.error('Lost broker!');
setTimeout(clientConnectTimeout, 2000);
});
return true;
}
}
// MQTT Publish timer
function clientPublishTimeout() {
if (client) {
try {
if (client.isConnected()) {
client.publish('/testing', 'test' + seqNumer);
seqNumer++;
}
} catch (error) {
console.log('Pulish error:', error);
}
}
}
// MQTT Connect timer.
function clientConnectTimeout() {
if (clientOpen()) {
console.log(`${server.addr} MQTT server connecting!`);
client.connect(options, () => {
console.log(`${server.addr} MQTT server connected!`);
client.subscribe('/testing', { qos: 1 }, () => {
console.log(`${server.addr} MQTT server subscribed!`);
});
});
} else {
setTimeout(clientConnectTimeout, 2000);
}
}
// Connect
setImmediate(clientConnectTimeout);
// Publish test every 5 seconds.
setInterval(clientPublishTimeout, 5000);
// Asynchronous event loop
iosched.forever();